agentmux_srv\backend\blockcontroller/
acp.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! AcpController: manages agent CLIs that speak the Agent Client Protocol (ACP).
5//!
6//! ACP is a JSON-RPC 2.0 protocol over stdin/stdout — the "LSP for AI agents."
7//! Instead of custom per-provider output parsing, ACP provides a standardized
8//! protocol for session management, prompting, and streaming events.
9//!
10//! Lifecycle:
11//!   1. Spawn the ACP agent process (e.g., `gemini --acp`, `openclaw acp`)
12//!   2. Send `initialize` request, receive capabilities
13//!   3. Send `initialized` notification
14//!   4. Create a session via `session/create`
15//!   5. For each user turn: send `session/prompt`, stream `session/update` notifications
16//!   6. On close: send `shutdown` + `exit`
17//!
18//! I/O model (similar to PersistentSubprocessController):
19//!   - stdin_writer: sends JSON-RPC requests/notifications to agent
20//!   - stdout_reader: reads JSON-RPC responses/notifications, persists + broadcasts
21//!   - process_waiter: monitors process lifecycle
22//!
23//! See: https://github.com/agentclientprotocol/agent-client-protocol
24
25use std::collections::HashMap;
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Arc, Mutex};
28
29use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
30use tokio::sync::mpsc;
31
32use super::{
33    BlockControllerRuntimeStatus, BlockInputUnion, Controller, STATUS_DONE, STATUS_INIT,
34    STATUS_RUNNING,
35};
36use super::health::HealthMonitor;
37use crate::backend::eventbus::EventBus;
38use crate::backend::storage::filestore::FileStore;
39use crate::backend::storage::wstore::WaveStore;
40use crate::backend::wps;
41
42/// WPS file subject name for ACP output.
43pub const ACP_OUTPUT_SUBJECT: &str = "output";
44
45/// Controller type constant.
46pub const BLOCK_CONTROLLER_ACP: &str = "acp";
47
48/// Inner state protected by mutex.
49struct AcpInner {
50    proc_status: String,
51    proc_exit_code: i32,
52    status_version: i32,
53    session_id: Option<String>,
54    current_pid: Option<u32>,
55    stdin_tx: Option<mpsc::Sender<String>>,
56    kill_tx: Option<tokio::sync::oneshot::Sender<bool>>,
57    /// First user prompt, deferred until session/create completes.
58    pending_prompt: Option<String>,
59}
60
61/// AcpController manages an ACP-speaking agent process.
62pub struct AcpController {
63    #[allow(dead_code)]
64    tab_id: String,
65    block_id: String,
66    inner: Arc<Mutex<AcpInner>>,
67    broker: Option<Arc<wps::Broker>>,
68    event_bus: Option<Arc<EventBus>>,
69    wstore: Option<Arc<WaveStore>>,
70    filestore: Option<Arc<FileStore>>,
71    health_monitor: Arc<HealthMonitor>,
72    /// Monotonically increasing JSON-RPC request ID.
73    next_rpc_id: Arc<AtomicU64>,
74}
75
76impl AcpController {
77    pub fn new(
78        tab_id: String,
79        block_id: String,
80        broker: Option<Arc<wps::Broker>>,
81        event_bus: Option<Arc<EventBus>>,
82        wstore: Option<Arc<WaveStore>>,
83        filestore: Option<Arc<FileStore>>,
84    ) -> Self {
85        let health_monitor = Arc::new(HealthMonitor::new(
86            block_id.clone(),
87            broker.clone(),
88        ));
89        Self {
90            tab_id,
91            block_id,
92            inner: Arc::new(Mutex::new(AcpInner {
93                proc_status: STATUS_INIT.to_string(),
94                proc_exit_code: 0,
95                status_version: 0,
96                session_id: None,
97                current_pid: None,
98                stdin_tx: None,
99                kill_tx: None,
100                pending_prompt: None,
101            })),
102            broker,
103            event_bus,
104            wstore,
105            filestore,
106            health_monitor,
107            next_rpc_id: Arc::new(AtomicU64::new(1)),
108        }
109    }
110
111    fn next_id(&self) -> u64 {
112        self.next_rpc_id.fetch_add(1, Ordering::Relaxed)
113    }
114
115    fn set_status(inner: &mut AcpInner, status: &str) {
116        inner.proc_status = status.to_string();
117        inner.status_version += 1;
118    }
119
120    fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
121        let inner = self.inner.lock().unwrap();
122        BlockControllerRuntimeStatus {
123            blockid: self.block_id.clone(),
124            version: inner.status_version,
125            shellprocstatus: inner.proc_status.clone(),
126            shellprocconnname: "local".to_string(),
127            shellprocexitcode: inner.proc_exit_code,
128            spawn_ts_ms: None,
129            is_agent_pane: true,
130        }
131    }
132
133    fn publish_status(&self) {
134        if let Some(ref broker) = self.broker {
135            let status = self.get_status_snapshot();
136            super::publish_controller_status(broker, &status);
137        }
138    }
139
140    fn is_running(&self) -> bool {
141        let inner = self.inner.lock().unwrap();
142        inner.stdin_tx.is_some()
143    }
144
145    /// Build a JSON-RPC 2.0 request.
146    fn make_request(&self, method: &str, params: serde_json::Value) -> String {
147        let id = self.next_id();
148        serde_json::json!({
149            "jsonrpc": "2.0",
150            "id": id,
151            "method": method,
152            "params": params,
153        }).to_string()
154    }
155
156    /// Build a JSON-RPC 2.0 notification (no id field).
157    fn make_notification(&self, method: &str, params: serde_json::Value) -> String {
158        serde_json::json!({
159            "jsonrpc": "2.0",
160            "method": method,
161            "params": params,
162        }).to_string()
163    }
164
165    /// Send a user message via ACP session/prompt.
166    /// If the process isn't spawned yet, spawns it first (the first prompt is
167    /// deferred until the session is established — see `pending_prompt`).
168    pub fn send_message(&self, message: String, cli_command: String, cli_args: Vec<String>, working_dir: String, env_vars: HashMap<String, String>) -> Result<(), String> {
169        if !self.is_running() {
170            // First turn: spawn and stash the prompt — the stdout reader will
171            // send it once the session/create response arrives.
172            {
173                let mut inner = self.inner.lock().unwrap();
174                inner.pending_prompt = Some(message.clone());
175            }
176            self.health_monitor.set_active_turn(true);
177            return self.spawn_process(cli_command, cli_args, working_dir, env_vars);
178        }
179
180        // Subsequent turns: session_id is already populated.
181        let session_id = {
182            let inner = self.inner.lock().unwrap();
183            inner.session_id.clone().unwrap_or_default()
184        };
185
186        let req = self.make_request("session/prompt", serde_json::json!({
187            "sessionId": session_id,
188            "prompt": {
189                "type": "text",
190                "text": message,
191            }
192        }));
193
194        self.health_monitor.set_active_turn(true);
195
196        let inner = self.inner.lock().unwrap();
197        let tx = inner.stdin_tx.as_ref()
198            .ok_or("ACP process not running after spawn")?;
199        tx.try_send(req)
200            .map_err(|e| format!("ACP stdin send failed: {e}"))
201    }
202
203    /// Spawn the ACP agent process and perform the initialize handshake.
204    fn spawn_process(&self, cli_command: String, cli_args: Vec<String>, working_dir: String, env_vars: HashMap<String, String>) -> Result<(), String> {
205        let mut cmd = crate::server::cli_handlers::make_cli_cmd(&cli_command);
206        cmd.args(&cli_args);
207
208        // Working directory
209        if !working_dir.is_empty() {
210            let expanded_dir = if working_dir.starts_with("~/") || working_dir == "~" {
211                if let Some(home) = dirs::home_dir() {
212                    home.join(working_dir.trim_start_matches("~/")).to_string_lossy().to_string()
213                } else {
214                    working_dir.clone()
215                }
216            } else {
217                working_dir.clone()
218            };
219            let dir_path = std::path::Path::new(&expanded_dir);
220            if !dir_path.exists() {
221                if let Err(e) = std::fs::create_dir_all(dir_path) {
222                    tracing::warn!(
223                        block_id = %self.block_id,
224                        dir = %expanded_dir,
225                        error = %e,
226                        "failed to create working directory for ACP agent"
227                    );
228                }
229            }
230            if dir_path.exists() {
231                cmd.current_dir(&expanded_dir);
232            }
233        }
234
235        // Environment variables
236        for (k, v) in &env_vars {
237            let expanded = crate::backend::base::expand_home_dir_safe(v);
238            cmd.env(k, expanded.to_string_lossy().as_ref());
239        }
240
241        // On Windows: suppress console-window allocation. Without CREATE_NO_WINDOW,
242        // node.exe spawned from a windowless sidecar may try to create/attach to a
243        // console, causing stdout to go to that console rather than the pipe.
244        #[cfg(windows)]
245        {
246            const CREATE_NO_WINDOW: u32 = 0x0800_0000;
247            cmd.creation_flags(CREATE_NO_WINDOW);
248        }
249
250        cmd.stdin(std::process::Stdio::piped());
251        cmd.stdout(std::process::Stdio::piped());
252        cmd.stderr(std::process::Stdio::piped());
253
254        let mut child = cmd.spawn().map_err(|e| {
255            tracing::error!(block_id = %self.block_id, error = %e, "ACP process spawn failed");
256            format!("failed to spawn ACP process: {e}")
257        })?;
258
259        let pid = child.id().unwrap_or(0);
260
261        tracing::info!(
262            block_id = %self.block_id,
263            pid = pid,
264            cmd = %cli_command,
265            args = ?cli_args,
266            "ACP agent process spawned"
267        );
268
269        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<bool>();
270        let stdin = child.stdin.take().unwrap();
271        let stdout = child.stdout.take().unwrap();
272        let stderr = child.stderr.take();
273
274        // Drain stderr with explicit error/EOF logging
275        if let Some(stderr_pipe) = stderr {
276            let block_id_stderr = self.block_id.clone();
277            tokio::spawn(async move {
278                let mut reader = BufReader::new(stderr_pipe).lines();
279                loop {
280                    match reader.next_line().await {
281                        Err(e) => {
282                            tracing::warn!(block_id = %block_id_stderr, error = %e, "ACP stderr read error");
283                            break;
284                        }
285                        Ok(None) => break,
286                        Ok(Some(line)) => {
287                            if !line.trim().is_empty() {
288                                tracing::warn!(
289                                    block_id = %block_id_stderr,
290                                    line = %line,
291                                    "ACP agent stderr"
292                                );
293                            }
294                        }
295                    }
296                }
297            });
298        }
299
300        // Stdin writer channel
301        let (msg_tx, mut msg_rx) = mpsc::channel::<String>(32);
302
303        {
304            let mut inner = self.inner.lock().unwrap();
305            inner.current_pid = Some(pid);
306            inner.kill_tx = Some(kill_tx);
307            inner.stdin_tx = Some(msg_tx.clone());
308            Self::set_status(&mut inner, STATUS_RUNNING);
309        }
310        self.publish_status();
311
312        // Spawn stdin writer task
313        let block_id_stdin = self.block_id.clone();
314        tokio::spawn(async move {
315            let mut stdin = tokio::io::BufWriter::new(stdin);
316            while let Some(line) = msg_rx.recv().await {
317                if let Err(e) = stdin.write_all(line.as_bytes()).await {
318                    tracing::error!(block_id = %block_id_stdin, error = %e, "ACP stdin write error");
319                    break;
320                }
321                if let Err(e) = stdin.write_all(b"\n").await {
322                    tracing::error!(block_id = %block_id_stdin, error = %e, "ACP stdin newline error");
323                    break;
324                }
325                if let Err(e) = stdin.flush().await {
326                    tracing::error!(block_id = %block_id_stdin, error = %e, "ACP stdin flush error");
327                    break;
328                }
329            }
330        });
331
332        // Spawn stdout reader task — reads NDJSON lines and broadcasts via WPS
333        let block_id_stdout = self.block_id.clone();
334        let broker_clone = self.broker.clone();
335        let filestore_clone = self.filestore.clone();
336        let inner_clone = self.inner.clone();
337        let health_clone = self.health_monitor.clone();
338        let rpc_id_clone = self.next_rpc_id.clone();
339        tokio::spawn(async move {
340            let mut reader = BufReader::new(stdout).lines();
341            tracing::info!(block_id = %block_id_stdout, "ACP stdout_reader started");
342
343            loop {
344                let line = match reader.next_line().await {
345                    Err(e) => {
346                        tracing::warn!(block_id = %block_id_stdout, error = %e, "ACP stdout read error");
347                        break;
348                    }
349                    Ok(None) => {
350                        tracing::info!(block_id = %block_id_stdout, "ACP stdout EOF");
351                        break;
352                    }
353                    Ok(Some(l)) => l,
354                };
355                if line.is_empty() {
356                    continue;
357                }
358
359                // Parse as JSON to check for session/update notifications
360                if let Ok(json) = serde_json::from_str::<serde_json::Value>(&line) {
361                    // Extract session ID from initialize result or session/create result
362                    if let Some(result) = json.get("result") {
363                        if let Some(sid) = result.get("sessionId").and_then(|v| v.as_str()) {
364                            let mut inner = inner_clone.lock().unwrap();
365                            inner.session_id = Some(sid.to_string());
366                            tracing::info!(
367                                block_id = %block_id_stdout,
368                                session_id = %sid,
369                                "ACP session established"
370                            );
371
372                            // Flush pending prompt now that session is ready.
373                            if let Some(prompt) = inner.pending_prompt.take() {
374                                let id = rpc_id_clone.fetch_add(1, Ordering::Relaxed);
375                                let req = serde_json::json!({
376                                    "jsonrpc": "2.0",
377                                    "id": id,
378                                    "method": "session/prompt",
379                                    "params": {
380                                        "sessionId": sid,
381                                        "prompt": { "type": "text", "text": prompt },
382                                    }
383                                }).to_string();
384                                if let Some(ref tx) = inner.stdin_tx {
385                                    let _ = tx.try_send(req);
386                                }
387                            }
388                        }
389                    }
390
391                    // Reset health monitor on prompt result (turn complete)
392                    if json.get("id").is_some() && json.get("result").is_some() {
393                        // This is a response to a request (e.g., session/prompt result)
394                        if let Some(result) = json.get("result") {
395                            if result.get("stopReason").is_some() {
396                                health_clone.set_active_turn(false);
397                            }
398                        }
399                    }
400                }
401
402                // Persist + broadcast via the shared helper (same as subprocess.rs)
403                if let Some(ref broker) = broker_clone {
404                    let line_with_newline = format!("{}\n", line);
405                    super::shell::handle_append_block_file(
406                        broker,
407                        &block_id_stdout,
408                        ACP_OUTPUT_SUBJECT,
409                        line_with_newline.as_bytes(),
410                        filestore_clone.as_ref(),
411                    );
412                }
413            }
414        });
415
416        // Spawn process waiter task
417        let block_id_wait = self.block_id.clone();
418        let inner_wait = self.inner.clone();
419        let broker_wait = self.broker.clone();
420        let health_wait = self.health_monitor.clone();
421        tokio::spawn(async move {
422            tokio::select! {
423                _ = kill_rx => {
424                    let _ = child.kill().await;
425                    tracing::info!(block_id = %block_id_wait, "ACP process killed");
426
427                    let mut inner = inner_wait.lock().unwrap();
428                    inner.stdin_tx = None;
429                    inner.current_pid = None;
430                    AcpController::set_status(&mut inner, STATUS_DONE);
431                    drop(inner);
432
433                    health_wait.set_active_turn(false);
434
435                    if let Some(ref broker) = broker_wait {
436                        let status = BlockControllerRuntimeStatus {
437                            blockid: block_id_wait.clone(),
438                            version: 0,
439                            shellprocstatus: STATUS_DONE.to_string(),
440                            shellprocconnname: "local".to_string(),
441                            shellprocexitcode: -1,
442                            spawn_ts_ms: None,
443                            is_agent_pane: true,
444                        };
445                        super::publish_controller_status(broker, &status);
446                    }
447                }
448                status = child.wait() => {
449                    let exit_code = status.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1);
450                    tracing::info!(
451                        block_id = %block_id_wait,
452                        exit_code = exit_code,
453                        "ACP process exited"
454                    );
455                    let mut inner = inner_wait.lock().unwrap();
456                    inner.proc_exit_code = exit_code;
457                    inner.stdin_tx = None;
458                    AcpController::set_status(&mut inner, STATUS_DONE);
459                    drop(inner);
460
461                    health_wait.set_active_turn(false);
462
463                    if let Some(ref broker) = broker_wait {
464                        let status = BlockControllerRuntimeStatus {
465                            blockid: block_id_wait.clone(),
466                            version: 0,
467                            shellprocstatus: STATUS_DONE.to_string(),
468                            shellprocconnname: "local".to_string(),
469                            shellprocexitcode: exit_code,
470                            spawn_ts_ms: None,
471                            is_agent_pane: true,
472                        };
473                        super::publish_controller_status(broker, &status);
474                    }
475                }
476            }
477        });
478
479        // Send ACP initialize handshake
480        let init_req = self.make_request("initialize", serde_json::json!({
481            "clientInfo": {
482                "name": "AgentMux",
483                "version": env!("CARGO_PKG_VERSION"),
484            },
485            "capabilities": {
486                "tools": true,
487                "fileAccess": true,
488            },
489            "workspaceRoots": [working_dir],
490        }));
491        let init_notification = self.make_notification("initialized", serde_json::json!({}));
492        let session_req = self.make_request("session/create", serde_json::json!({
493            "cwd": working_dir,
494        }));
495
496        // Queue the handshake messages
497        let inner = self.inner.lock().unwrap();
498        if let Some(ref tx) = inner.stdin_tx {
499            let _ = tx.try_send(init_req);
500            let _ = tx.try_send(init_notification);
501            let _ = tx.try_send(session_req);
502        }
503
504        Ok(())
505    }
506}
507
508impl Controller for AcpController {
509    fn start(
510        &self,
511        block_meta: super::super::obj::MetaMapType,
512        _rt_opts: Option<serde_json::Value>,
513        _force: bool,
514    ) -> Result<(), String> {
515        // Extract spawn config from block metadata
516        let cmd = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD, "");
517        let cwd = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD_CWD, "");
518        let args_str = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD_ARGS, "[]");
519        let env_str = super::super::obj::meta_get_string(&block_meta, super::META_KEY_CMD_ENV, "{}");
520
521        if cmd.is_empty() {
522            return Err("ACP controller: no cmd specified in block meta".to_string());
523        }
524
525        let args: Vec<String> = serde_json::from_str(&args_str).unwrap_or_default();
526        let env_vars: HashMap<String, String> = serde_json::from_str(&env_str).unwrap_or_default();
527
528        self.spawn_process(cmd, args, cwd, env_vars)
529    }
530
531    fn stop(&self, _graceful: bool, _new_status: &str) -> Result<(), String> {
532        // Send shutdown request before killing
533        {
534            let inner = self.inner.lock().unwrap();
535            if let Some(ref tx) = inner.stdin_tx {
536                let shutdown = self.make_request("shutdown", serde_json::json!({}));
537                let exit = self.make_notification("exit", serde_json::json!({}));
538                let _ = tx.try_send(shutdown);
539                let _ = tx.try_send(exit);
540            }
541        }
542
543        // Kill the process
544        let kill_tx = {
545            let mut inner = self.inner.lock().unwrap();
546            inner.stdin_tx = None;
547            inner.kill_tx.take()
548        };
549        if let Some(tx) = kill_tx {
550            let _ = tx.send(true);
551        }
552
553        {
554            let mut inner = self.inner.lock().unwrap();
555            Self::set_status(&mut inner, STATUS_DONE);
556        }
557        self.publish_status();
558        Ok(())
559    }
560
561    fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
562        self.get_status_snapshot()
563    }
564
565    fn send_input(&self, input: BlockInputUnion, _seq: Option<u64>) -> Result<(), String> {
566        if let Some(data) = input.input_data {
567            // Raw input from the frontend — treat as a user message.
568            // The frontend sends the user prompt as UTF-8 bytes.
569            let message = String::from_utf8_lossy(&data).to_string();
570            if message.trim().is_empty() {
571                return Ok(());
572            }
573
574            if !self.is_running() {
575                // Process not running — stash as pending so start() picks it up.
576                let mut inner = self.inner.lock().unwrap();
577                inner.pending_prompt = Some(message);
578                return Err("ACP process not running — message queued for next start()".to_string());
579            }
580
581            let session_id = {
582                let inner = self.inner.lock().unwrap();
583                inner.session_id.clone().unwrap_or_default()
584            };
585            let req = self.make_request("session/prompt", serde_json::json!({
586                "sessionId": session_id,
587                "prompt": {
588                    "type": "text",
589                    "text": message,
590                }
591            }));
592            self.health_monitor.set_active_turn(true);
593            let inner = self.inner.lock().unwrap();
594            if let Some(ref tx) = inner.stdin_tx {
595                tx.try_send(req)
596                    .map_err(|e| format!("ACP stdin send failed: {e}"))?;
597            }
598        }
599
600        if let Some(sig) = input.sig_name {
601            if sig == "SIGTERM" || sig == "SIGINT" {
602                return self.stop(true, STATUS_DONE);
603            }
604        }
605
606        Ok(())
607    }
608
609    fn controller_type(&self) -> &str {
610        BLOCK_CONTROLLER_ACP
611    }
612
613    fn block_id(&self) -> &str {
614        &self.block_id
615    }
616
617    fn as_any(&self) -> &dyn std::any::Any {
618        self
619    }
620}